#coding:utf8
import time

from pyspark.sql import *
import os
from pyspark.sql.types import *
import pandas as pd
import pyspark.sql.functions as f
from pyspark import *

if __name__ == '__main__':
    spark=SparkSession.builder.appName("test11_wordcount_demo")\
        .master("local[*]")\
        .config("spark.sql.shuffle.partitions",2) \
        .getOrCreate()
    """
    spark.sql.shuffle.partitions 参数指的是，在sql计算中，shuffle算子阶段默认的分区数是200个
    对于集群模式来说，200也不算多
    这个参数和spark rdd中设置并行度的参数是相互独立的
    """

    sc=spark.sparkContext

    localhost_path ="file://"+os.getcwd()+"/../data/input/sql/u.data"
    # localhost_path =os.getcwd()+"/../data/input/sql/u.data"
    schema=StructType().add("user_id",StringType(),True)\
        .add("movie_id",IntegerType(),True)\
        .add("rank",IntegerType(),True)\
        .add("ts",StringType(),True)
    df=spark.read.csv(localhost_path,schema,header=False,sep='\t',encoding="utf-8")

    # TODO 1: 用户平均分
    df.groupBy("user_id").avg("rank")\
        .withColumn("avg(rank)",f.round("avg(rank)",2))\
        .orderBy("avg(rank)",ascending=False).show()
    # TODO 2: 电影的平均分查询
    df.groupBy("movie_id").avg("rank")\
        .withColumnRenamed("avg(rank)","avg_rank")\
        .withColumn("avg_rank",f.round("avg_rank",2))\
        .orderBy("avg_rank",ascending=False).show()
    # TODO 3: 查询大于平均分的电影的数量 # Row
    print("大于电影平均分电影的数量@",df.where(df["rank"] > df.select(f.avg("rank")).first()["avg(rank)"]).count(),sep="")
    # TODO 4: 查询高分电影中(>3)打分次数最多的用户, 此人打分的平均分
    user_id=df.where("rank > 3").groupBy("user_id").count()\
        .orderBy("count",ascending=False).limit(1).first()["user_id"]
    print(user_id)
    df.where(df["user_id"]==user_id).groupBy("user_id").avg("rank")\
        .select("user_id",f.round("avg(rank)",2)).show()
    # TODO 5: 查询每个用户的平局打分, 最低打分, 最高打分
    df.groupBy("user_id").agg(
        # alias 列重命名
        f.round(f.avg("rank"),2).alias("avg_rank")
        ,f.min("rank")
        ,f.max("rank")
    ).show()
    # TODO 6: 查询评分超过100次的电影, 的平均分 排名 TOP10
    df.groupBy("movie_id").agg(
        f.count("movie_id").alias("cnt")
        ,f.round(f.avg("rank"),2).alias("avg_rank")
    ).where("cnt > 100").orderBy("avg_rank",ascending=False).limit(10).show()

    time.sleep(100000)
"""
1. agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合
2. alias: 它是Column对象的API, 可以针对一个列 进行改名
3. withColumnRenamed: 它是DataFrame的API, 可以对DF中的列进行改名, 一次改一个列, 改多个列 可以链式调用
4. orderBy: DataFrame的API, 进行排序, 参数1是被排序的列, 参数2是 升序(True) 或 降序 False
5. first: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象.
# Row对象 就是一个数组, 你可以通过row['列名'] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等)
"""






